Skip to content

[GLUTEN-12306][FLINK][VL] Route print sink options to Velox via reflection (no file path)#12320

Merged
lgbo-ustc merged 7 commits into
apache:mainfrom
ggjh-159:fix/print-sink-multi-parallelism
Jun 25, 2026
Merged

[GLUTEN-12306][FLINK][VL] Route print sink options to Velox via reflection (no file path)#12320
lgbo-ustc merged 7 commits into
apache:mainfrom
ggjh-159:fix/print-sink-multi-parallelism

Conversation

@ggjh-159

Copy link
Copy Markdown
Contributor

fix: #12306
related prs: bigo-sg/velox#49 bigo-sg/velox4j#39

What changes are proposed in this pull request?

This PR consumes the companion velox PR (which now writes to std::cout / std::cerr):

  • Drop the file-path resolution and the Configuration / CoreOptions / ConfigConstants imports.
  • PrintSinkFactory.extractPrintOptions reflects into the RowDataPrintFunctionPrintSinkOutputWriter to read the user-supplied sinkIdentifier (print-identifier) and target (true = stderr) fields, and passes them through to PrintTableHandle(tableName, inputColumns, printIdentifier, isStdErr).
  • The velox C++ side computes the Flink-style N> prefix from parallelism / task_index session properties and serializes concurrent subtask writes with a process-wide mutex.

How was this patch tested?

  • UTs
  • Manual run on a standalone Flink cluster with parallelism.default = 2, nexmark events.num = 10000, tps = 2000, query q0: every bid row reaches taskmanager.out, each line carries the 1> / 2> subtask prefix, no truncation across subtasks.

@ggjh-159 ggjh-159 force-pushed the fix/print-sink-multi-parallelism branch from 914fab7 to bc074f6 Compare June 18, 2026 06:12
@github-actions github-actions Bot added the INFRA label Jun 18, 2026
Field idField = writer.getClass().getDeclaredField("sinkIdentifier");
idField.setAccessible(true);
Field stdErrField = writer.getClass().getDeclaredField("target");
stdErrField.setAccessible(true);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ReflectUtils.getObjectField() should be simpler

// Flink 1.19.x field names: sinkIdentifier (print-identifier), target (standard-error, true =
// stderr).
// Package-private for direct unit testing.
static String[] extractPrintOptions(Transformation<RowData> transformation) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the readability of String[] is bad, may we can use class PringOptions instead?

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class PrintSinkFactoryTest {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlutenStreamingTestBase.runAndCheck() still read taskmanager.out, please check whether it should be fixed together?

@ggjh-159 ggjh-159 force-pushed the fix/print-sink-multi-parallelism branch from 8c5eba9 to adcbeaf Compare June 24, 2026 09:10
@ggjh-159 ggjh-159 requested a review from KevinyhZou June 24, 2026 09:17
@ggjh-159

Copy link
Copy Markdown
Contributor Author

cc @KevinyhZou

@ggjh-159

Copy link
Copy Markdown
Contributor Author

@lgbo-ustc Would you mind re-triggering the CI for this PR? I've pushed a new commit and would like to confirm the Flink tests.

@KevinyhZou KevinyhZou left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@KevinyhZou

KevinyhZou commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

CI failed (#12358), please disable nexmark q12 test first, we will open another pr:#12363 to fix this problem @ggjh-159

@ggjh-159

ggjh-159 commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

CI failed (#12358), please disable nexmark q12 test first, we will open another pr:#12363 to fix this problem @ggjh-159

Ok, q12 is disabled now.

… captured fd=1

Reverts b561569. That commit removed File.mkdirs() under the mistaken
assumption that CoreOptions.FLINK_LOG_DIR would create the dir. Under the
dup2(fd=1) capture mechanism, Flink never creates that dir, so C_LIB.open()
returned -1 and ScalarFunctionsTest/ScanTest failed with FlinkRuntimeException
at runAndCheck:147. Restoring the 59015d1 state where mkdirs() is the dir
creator and FLINK_LOG_DIR is intentionally absent (dup2 makes it redundant).
@ggjh-159

Copy link
Copy Markdown
Contributor Author

@lgbo-ustc Sorry, I accidentally reverted a previous unit test fix. It's now restored.

Full local run of the gluten-flink ut module:

image Could you please rebuild the CI?

@lgbo-ustc lgbo-ustc merged commit 649eb78 into apache:main Jun 25, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FLINK][VL] PrintSink Crashes Under Multi-Parallelism

3 participants